


<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">


<html xmlns="http://www.w3.org/1999/xhtml">
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    
    <title>ZeroMQ (Scala) &mdash; Akka Documentation</title>
    
    <link rel="stylesheet" href="../_static/style.css" type="text/css" />
    <link rel="stylesheet" href="../_static/pygments.css" type="text/css" />
    <link rel="stylesheet" href="../_static/prettify.css" type="text/css" />
    <link rel="stylesheet" href="../_static/base.css" type="text/css" />
    <link rel="stylesheet" href="../_static/docs.css" type="text/css" />
    <link rel="stylesheet" href="http://fonts.googleapis.com/css?family=Exo:300,400,600,700" type="text/css" />
    
    <script type="text/javascript">
      var DOCUMENTATION_OPTIONS = {
        URL_ROOT:    '../',
        VERSION:     '2.0.1',
        COLLAPSE_INDEX: false,
        FILE_SUFFIX: '.html',
        HAS_SOURCE:  true
      };
    </script>
    <script type="text/javascript" src="../_static/jquery.js"></script>
    <script type="text/javascript" src="../_static/underscore.js"></script>
    <script type="text/javascript" src="../_static/doctools.js"></script>
    <script type="text/javascript" src="../_static/toc.js"></script>
    <script type="text/javascript" src="../_static/prettify.js"></script>
    <script type="text/javascript" src="../_static/highlightCode.js"></script>
    <script type="text/javascript" src="../_static/effects.core.js"></script>
    <script type="text/javascript" src="../_static/effects.highlight.js"></script>
    <script type="text/javascript" src="../_static/scrollTo.js"></script>
    <script type="text/javascript" src="../_static/contentsFix.js"></script>
    <link rel="shortcut icon" href="../_static/favicon.ico"/>
    <link rel="top" title="Akka Documentation" href="../index.html" />
    <link rel="up" title="Scala API" href="index.html" />
    <link rel="next" title="Java API" href="../java/index.html" />
    <link rel="prev" title="Akka Extensions (Scala)" href="extending-akka.html" /> 
  </head>
  <body>
  <div class="navbar">
    <div class="navbar-inner">
      <div class="container">
        <div class="navbar-logo">
          <a href="http://akka.io"><img src="../_static/logo-small.png" /></a>
        </div>    
        <ul class="nav">
          <li><a href="http://akka.io/docs">Documentation</a></li>
          <li><a href="http://akka.io/downloads">Download</a></li>
          <li><a href="http://groups.google.com/group/akka-user">Mailing List</a></li>
          <li><a href="http://github.com/akka/akka">Code</a></li>           
          <li><a href="http://typesafe.com/products/typesafe-subscription">Commerical Support</a></li>
        </ul>
      </div>
    </div>
  </div>
  <div class="main">
    <div class="container">
      <div class="page-title">ZeroMQ (Scala)</div><div class="pdf-link"><a href="http://akka.io/docs/akka/2.0.1/Akka.pdf"><img src="../_static/pdf-icon.png" style="height: 40px;" /></a></div></div>
    <div class="main-container">
      <div class="container">
        <div class="row">
          <div class="span12">
            <ul class="breadcrumb">           
              <li>
                 <span class="divider">|</span> <a href="../java/index.html">Java API</a> <span class="divider">»</span>
              </li>
              <li>
                <a href="../index.html">Contents</a>
              </li>
              <li>
                <span class="divider">«</span> <a href="extending-akka.html">Akka Extensions (Scala)</a> <span class="divider">|</span>
              </li>
              <li>
                Version 2.0.1
              </li>
            </ul>         
          </div>
        </div>
        <div class="row">
          <div class="span9">
            
  <div class="section" id="zeromq-scala">
<span id="id1"></span><h1>ZeroMQ (Scala)</h1>
<p>Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket.</p>
<p>ZeroMQ is very opinionated when it comes to multi-threading so configuration option <cite>akka.zeromq.socket-dispatcher</cite> always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.</p>
<p>The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
The benefit of the scala library is that you don&#8217;t need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.</p>
<div class="section" id="connection">
<h2>Connection</h2>
<p>ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
Sockets are always created using the <tt class="docutils literal"><span class="pre">akka.zeromq.ZeroMQExtension</span></tt>, for example:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">import</span> <span class="nn">akka.zeromq.ZeroMQExtension</span>
<span class="k">val</span> <span class="n">pubSocket</span> <span class="k">=</span> <span class="nc">ZeroMQExtension</span><span class="o">(</span><span class="n">system</span><span class="o">).</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Pub</span><span class="o">,</span> <span class="nc">Bind</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1234&quot;</span><span class="o">))</span>
</pre></div>
</div>
<p>or by importing the <tt class="docutils literal"><span class="pre">akka.zeromq._</span></tt> package to make newSocket method available on system, via an implicit conversion.</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">import</span> <span class="nn">akka.zeromq._</span>
<span class="k">val</span> <span class="n">pubSocket2</span> <span class="k">=</span> <span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Pub</span><span class="o">,</span> <span class="nc">Bind</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1234&quot;</span><span class="o">))</span>
</pre></div>
</div>
<p>Above examples will create a ZeroMQ Publisher socket that is Bound to the port 1234 on localhost.</p>
<p>Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">import</span> <span class="nn">akka.zeromq._</span>
<span class="k">val</span> <span class="n">listener</span> <span class="k">=</span> <span class="n">system</span><span class="o">.</span><span class="n">actorOf</span><span class="o">(</span><span class="nc">Props</span><span class="o">(</span><span class="k">new</span> <span class="nc">Actor</span> <span class="o">{</span>
  <span class="k">def</span> <span class="n">receive</span><span class="k">:</span> <span class="kt">Receive</span> <span class="o">=</span> <span class="o">{</span>
    <span class="k">case</span> <span class="nc">Connecting</span>    <span class="k">⇒</span> <span class="c1">//...</span>
    <span class="k">case</span> <span class="n">m</span><span class="k">:</span> <span class="kt">ZMQMessage</span> <span class="k">⇒</span> <span class="kt">//...</span>
    <span class="kt">case</span> <span class="k">_</span>             <span class="k">⇒</span> <span class="kt">//...</span>
  <span class="o">}</span>
<span class="o">}))</span>
<span class="k">val</span> <span class="n">subSocket</span> <span class="k">=</span> <span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Sub</span><span class="o">,</span> <span class="nc">Listener</span><span class="o">(</span><span class="n">listener</span><span class="o">),</span> <span class="nc">Connect</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1234&quot;</span><span class="o">),</span> <span class="nc">SubscribeAll</span><span class="o">)</span>
</pre></div>
</div>
<p>The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to <a class="reference external" href="http://zguide.zeromq.org/page:all">ZeroMQ &#8211; The Guide</a>.</p>
<div class="section" id="publisher-subscriber-connection">
<h3>Publisher-Subscriber Connection</h3>
<p>In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
to one or more actors that do not interact with the actor that sent the message.</p>
<p>When you&#8217;re using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.</p>
<p>An actor is subscribed to a topic as follows:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">val</span> <span class="n">subTopicSocket</span> <span class="k">=</span> <span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Sub</span><span class="o">,</span> <span class="nc">Listener</span><span class="o">(</span><span class="n">listener</span><span class="o">),</span> <span class="nc">Connect</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1234&quot;</span><span class="o">),</span> <span class="nc">Subscribe</span><span class="o">(</span><span class="s">&quot;foo.bar&quot;</span><span class="o">))</span>
</pre></div>
</div>
<p>It is a prefix match so it is subscribed to all topics starting with <tt class="docutils literal"><span class="pre">foo.bar</span></tt>. Note that if the given string is empty or
<tt class="docutils literal"><span class="pre">SubscribeAll</span></tt> is used, the actor is subscribed to all topics.</p>
<p>To unsubscribe from a topic you do the following:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="n">subTopicSocket</span> <span class="o">!</span> <span class="nc">Unsubscribe</span><span class="o">(</span><span class="s">&quot;foo.bar&quot;</span><span class="o">)</span>
</pre></div>
</div>
<p>To publish messages to a topic you must use two Frames with the topic in the first frame.</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="n">pubSocket</span> <span class="o">!</span> <span class="nc">ZMQMessage</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="nc">Frame</span><span class="o">(</span><span class="s">&quot;foo.bar&quot;</span><span class="o">),</span> <span class="nc">Frame</span><span class="o">(</span><span class="n">payload</span><span class="o">)))</span>
</pre></div>
</div>
<div class="section" id="pub-sub-in-action">
<h4>Pub-Sub in Action</h4>
<p>The following example illustrates one publisher with two subscribers.</p>
<p>The publisher monitors current heap usage and system load and periodically publishes <tt class="docutils literal"><span class="pre">Heap</span></tt> events on the <tt class="docutils literal"><span class="pre">&quot;health.heap&quot;</span></tt> topic
and <tt class="docutils literal"><span class="pre">Load</span></tt> events on the <tt class="docutils literal"><span class="pre">&quot;health.load&quot;</span></tt> topic.</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">import</span> <span class="nn">akka.zeromq._</span>
<span class="k">import</span> <span class="nn">akka.actor.Actor</span>
<span class="k">import</span> <span class="nn">akka.actor.Props</span>
<span class="k">import</span> <span class="nn">akka.actor.ActorLogging</span>
<span class="k">import</span> <span class="nn">akka.serialization.SerializationExtension</span>
<span class="k">import</span> <span class="nn">java.lang.management.ManagementFactory</span>

<span class="k">case</span> <span class="k">object</span> <span class="nc">Tick</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">Heap</span><span class="o">(</span><span class="n">timestamp</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">used</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">max</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">Load</span><span class="o">(</span><span class="n">timestamp</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">loadAverage</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>

<span class="k">class</span> <span class="nc">HealthProbe</span> <span class="k">extends</span> <span class="nc">Actor</span> <span class="o">{</span>

  <span class="k">val</span> <span class="n">pubSocket</span> <span class="k">=</span> <span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Pub</span><span class="o">,</span> <span class="nc">Bind</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1235&quot;</span><span class="o">))</span>
  <span class="k">val</span> <span class="n">memory</span> <span class="k">=</span> <span class="nc">ManagementFactory</span><span class="o">.</span><span class="n">getMemoryMXBean</span>
  <span class="k">val</span> <span class="n">os</span> <span class="k">=</span> <span class="nc">ManagementFactory</span><span class="o">.</span><span class="n">getOperatingSystemMXBean</span>
  <span class="k">val</span> <span class="n">ser</span> <span class="k">=</span> <span class="nc">SerializationExtension</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">)</span>

  <span class="k">override</span> <span class="k">def</span> <span class="n">preStart</span><span class="o">()</span> <span class="o">{</span>
    <span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">.</span><span class="n">scheduler</span><span class="o">.</span><span class="n">schedule</span><span class="o">(</span><span class="mi">1</span> <span class="n">second</span><span class="o">,</span> <span class="mi">1</span> <span class="n">second</span><span class="o">,</span> <span class="n">self</span><span class="o">,</span> <span class="nc">Tick</span><span class="o">)</span>
  <span class="o">}</span>

  <span class="k">override</span> <span class="k">def</span> <span class="n">postRestart</span><span class="o">(</span><span class="n">reason</span><span class="k">:</span> <span class="kt">Throwable</span><span class="o">)</span> <span class="o">{</span>
    <span class="c1">// don&#39;t call preStart, only schedule once</span>
  <span class="o">}</span>

  <span class="k">def</span> <span class="n">receive</span><span class="k">:</span> <span class="kt">Receive</span> <span class="o">=</span> <span class="o">{</span>
    <span class="k">case</span> <span class="nc">Tick</span> <span class="k">⇒</span>
      <span class="k">val</span> <span class="n">currentHeap</span> <span class="k">=</span> <span class="n">memory</span><span class="o">.</span><span class="n">getHeapMemoryUsage</span>
      <span class="k">val</span> <span class="n">timestamp</span> <span class="k">=</span> <span class="nc">System</span><span class="o">.</span><span class="n">currentTimeMillis</span>

      <span class="c1">// use akka SerializationExtension to convert to bytes</span>
      <span class="k">val</span> <span class="n">heapPayload</span> <span class="k">=</span> <span class="n">ser</span><span class="o">.</span><span class="n">serialize</span><span class="o">(</span><span class="nc">Heap</span><span class="o">(</span><span class="n">timestamp</span><span class="o">,</span> <span class="n">currentHeap</span><span class="o">.</span><span class="n">getUsed</span><span class="o">,</span> <span class="n">currentHeap</span><span class="o">.</span><span class="n">getMax</span><span class="o">)).</span><span class="n">fold</span><span class="o">(</span><span class="k">throw</span> <span class="n">_</span><span class="o">,</span> <span class="n">identity</span><span class="o">)</span>
      <span class="c1">// the first frame is the topic, second is the message</span>
      <span class="n">pubSocket</span> <span class="o">!</span> <span class="nc">ZMQMessage</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="nc">Frame</span><span class="o">(</span><span class="s">&quot;health.heap&quot;</span><span class="o">),</span> <span class="nc">Frame</span><span class="o">(</span><span class="n">heapPayload</span><span class="o">)))</span>

      <span class="c1">// use akka SerializationExtension to convert to bytes</span>
      <span class="k">val</span> <span class="n">loadPayload</span> <span class="k">=</span> <span class="n">ser</span><span class="o">.</span><span class="n">serialize</span><span class="o">(</span><span class="nc">Load</span><span class="o">(</span><span class="n">timestamp</span><span class="o">,</span> <span class="n">os</span><span class="o">.</span><span class="n">getSystemLoadAverage</span><span class="o">)).</span><span class="n">fold</span><span class="o">(</span><span class="k">throw</span> <span class="n">_</span><span class="o">,</span> <span class="n">identity</span><span class="o">)</span>
      <span class="c1">// the first frame is the topic, second is the message</span>
      <span class="n">pubSocket</span> <span class="o">!</span> <span class="nc">ZMQMessage</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="nc">Frame</span><span class="o">(</span><span class="s">&quot;health.load&quot;</span><span class="o">),</span> <span class="nc">Frame</span><span class="o">(</span><span class="n">loadPayload</span><span class="o">)))</span>
  <span class="o">}</span>
<span class="o">}</span>

  <span class="n">system</span><span class="o">.</span><span class="n">actorOf</span><span class="o">(</span><span class="nc">Props</span><span class="o">[</span><span class="kt">HealthProbe</span><span class="o">],</span> <span class="n">name</span> <span class="k">=</span> <span class="s">&quot;health&quot;</span><span class="o">)</span>
</pre></div>
</div>
<p>Let&#8217;s add one subscriber that logs the information. It subscribes to all topics starting with <tt class="docutils literal"><span class="pre">&quot;health&quot;</span></tt>, i.e. both <tt class="docutils literal"><span class="pre">Heap</span></tt> and
<tt class="docutils literal"><span class="pre">Load</span></tt> events.</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">class</span> <span class="nc">Logger</span> <span class="k">extends</span> <span class="nc">Actor</span> <span class="k">with</span> <span class="nc">ActorLogging</span> <span class="o">{</span>

  <span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Sub</span><span class="o">,</span> <span class="nc">Listener</span><span class="o">(</span><span class="n">self</span><span class="o">),</span> <span class="nc">Connect</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1235&quot;</span><span class="o">),</span> <span class="nc">Subscribe</span><span class="o">(</span><span class="s">&quot;health&quot;</span><span class="o">))</span>
  <span class="k">val</span> <span class="n">ser</span> <span class="k">=</span> <span class="nc">SerializationExtension</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">)</span>
  <span class="k">val</span> <span class="n">timestampFormat</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SimpleDateFormat</span><span class="o">(</span><span class="s">&quot;HH:mm:ss.SSS&quot;</span><span class="o">)</span>

  <span class="k">def</span> <span class="n">receive</span> <span class="k">=</span> <span class="o">{</span>
    <span class="c1">// the first frame is the topic, second is the message</span>
    <span class="k">case</span> <span class="n">m</span><span class="k">:</span> <span class="kt">ZMQMessage</span> <span class="kt">if</span> <span class="kt">m.firstFrameAsString</span> <span class="o">=</span><span class="k">=</span> <span class="s">&quot;health.heap&quot;</span> <span class="k">⇒</span>
      <span class="n">ser</span><span class="o">.</span><span class="n">deserialize</span><span class="o">(</span><span class="n">m</span><span class="o">.</span><span class="n">payload</span><span class="o">(</span><span class="mi">1</span><span class="o">),</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">Heap</span><span class="o">])</span> <span class="k">match</span> <span class="o">{</span>
        <span class="k">case</span> <span class="nc">Right</span><span class="o">(</span><span class="nc">Heap</span><span class="o">(</span><span class="n">timestamp</span><span class="o">,</span> <span class="n">used</span><span class="o">,</span> <span class="n">max</span><span class="o">))</span> <span class="k">⇒</span>
          <span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="s">&quot;Used heap {} bytes, at {}&quot;</span><span class="o">,</span> <span class="n">used</span><span class="o">,</span> <span class="n">timestampFormat</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="k">new</span> <span class="nc">Date</span><span class="o">(</span><span class="n">timestamp</span><span class="o">)))</span>
        <span class="k">case</span> <span class="nc">Left</span><span class="o">(</span><span class="n">e</span><span class="o">)</span> <span class="k">⇒</span> <span class="k">throw</span> <span class="n">e</span>
      <span class="o">}</span>

    <span class="k">case</span> <span class="n">m</span><span class="k">:</span> <span class="kt">ZMQMessage</span> <span class="kt">if</span> <span class="kt">m.firstFrameAsString</span> <span class="o">=</span><span class="k">=</span> <span class="s">&quot;health.load&quot;</span> <span class="k">⇒</span>
      <span class="n">ser</span><span class="o">.</span><span class="n">deserialize</span><span class="o">(</span><span class="n">m</span><span class="o">.</span><span class="n">payload</span><span class="o">(</span><span class="mi">1</span><span class="o">),</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">Load</span><span class="o">])</span> <span class="k">match</span> <span class="o">{</span>
        <span class="k">case</span> <span class="nc">Right</span><span class="o">(</span><span class="nc">Load</span><span class="o">(</span><span class="n">timestamp</span><span class="o">,</span> <span class="n">loadAverage</span><span class="o">))</span> <span class="k">⇒</span>
          <span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="o">(</span><span class="s">&quot;Load average {}, at {}&quot;</span><span class="o">,</span> <span class="n">loadAverage</span><span class="o">,</span> <span class="n">timestampFormat</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="k">new</span> <span class="nc">Date</span><span class="o">(</span><span class="n">timestamp</span><span class="o">)))</span>
        <span class="k">case</span> <span class="nc">Left</span><span class="o">(</span><span class="n">e</span><span class="o">)</span> <span class="k">⇒</span> <span class="k">throw</span> <span class="n">e</span>
      <span class="o">}</span>
  <span class="o">}</span>
<span class="o">}</span>

  <span class="n">system</span><span class="o">.</span><span class="n">actorOf</span><span class="o">(</span><span class="nc">Props</span><span class="o">[</span><span class="kt">Logger</span><span class="o">],</span> <span class="n">name</span> <span class="k">=</span> <span class="s">&quot;logger&quot;</span><span class="o">)</span>
</pre></div>
</div>
<p>Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to <tt class="docutils literal"><span class="pre">Heap</span></tt> events.</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">class</span> <span class="nc">HeapAlerter</span> <span class="k">extends</span> <span class="nc">Actor</span> <span class="k">with</span> <span class="nc">ActorLogging</span> <span class="o">{</span>

  <span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span><span class="nc">SocketType</span><span class="o">.</span><span class="nc">Sub</span><span class="o">,</span> <span class="nc">Listener</span><span class="o">(</span><span class="n">self</span><span class="o">),</span> <span class="nc">Connect</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1235&quot;</span><span class="o">),</span> <span class="nc">Subscribe</span><span class="o">(</span><span class="s">&quot;health.heap&quot;</span><span class="o">))</span>
  <span class="k">val</span> <span class="n">ser</span> <span class="k">=</span> <span class="nc">SerializationExtension</span><span class="o">(</span><span class="n">context</span><span class="o">.</span><span class="n">system</span><span class="o">)</span>
  <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span>

  <span class="k">def</span> <span class="n">receive</span> <span class="k">=</span> <span class="o">{</span>
    <span class="c1">// the first frame is the topic, second is the message</span>
    <span class="k">case</span> <span class="n">m</span><span class="k">:</span> <span class="kt">ZMQMessage</span> <span class="kt">if</span> <span class="kt">m.firstFrameAsString</span> <span class="o">=</span><span class="k">=</span> <span class="s">&quot;health.heap&quot;</span> <span class="k">⇒</span>
      <span class="n">ser</span><span class="o">.</span><span class="n">deserialize</span><span class="o">(</span><span class="n">m</span><span class="o">.</span><span class="n">payload</span><span class="o">(</span><span class="mi">1</span><span class="o">),</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">Heap</span><span class="o">])</span> <span class="k">match</span> <span class="o">{</span>
        <span class="k">case</span> <span class="nc">Right</span><span class="o">(</span><span class="nc">Heap</span><span class="o">(</span><span class="n">timestamp</span><span class="o">,</span> <span class="n">used</span><span class="o">,</span> <span class="n">max</span><span class="o">))</span> <span class="k">⇒</span>
          <span class="k">if</span> <span class="o">((</span><span class="n">used</span><span class="o">.</span><span class="n">toDouble</span> <span class="o">/</span> <span class="n">max</span><span class="o">)</span> <span class="o">&gt;</span> <span class="mf">0.9</span><span class="o">)</span> <span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span>
          <span class="k">else</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span>
          <span class="k">if</span> <span class="o">(</span><span class="n">count</span> <span class="o">&gt;</span> <span class="mi">10</span><span class="o">)</span> <span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="o">(</span><span class="s">&quot;Need more memory, using {} %&quot;</span><span class="o">,</span> <span class="o">(</span><span class="mf">100.0</span> <span class="o">*</span> <span class="n">used</span> <span class="o">/</span> <span class="n">max</span><span class="o">))</span>
        <span class="k">case</span> <span class="nc">Left</span><span class="o">(</span><span class="n">e</span><span class="o">)</span> <span class="k">⇒</span> <span class="k">throw</span> <span class="n">e</span>
      <span class="o">}</span>
  <span class="o">}</span>
<span class="o">}</span>

  <span class="n">system</span><span class="o">.</span><span class="n">actorOf</span><span class="o">(</span><span class="nc">Props</span><span class="o">[</span><span class="kt">HeapAlerter</span><span class="o">],</span> <span class="n">name</span> <span class="k">=</span> <span class="s">&quot;alerter&quot;</span><span class="o">)</span>
</pre></div>
</div>
</div>
</div>
<div class="section" id="router-dealer-connection">
<h3>Router-Dealer Connection</h3>
<p>While Pub/Sub is nice the real advantage of zeromq is that it is a &#8220;lego-box&#8221; for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
When you&#8217;re using ZeroMQ to integrate many systems you&#8217;ll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events.</p>
<p>To create a Router socket that has a high watermark configured, you would do:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">val</span> <span class="n">highWatermarkSocket</span> <span class="k">=</span> <span class="n">system</span><span class="o">.</span><span class="n">newSocket</span><span class="o">(</span>
  <span class="nc">SocketType</span><span class="o">.</span><span class="nc">Router</span><span class="o">,</span>
  <span class="nc">Listener</span><span class="o">(</span><span class="n">listener</span><span class="o">),</span>
  <span class="nc">Bind</span><span class="o">(</span><span class="s">&quot;tcp://127.0.0.1:1234&quot;</span><span class="o">),</span>
  <span class="nc">HighWatermark</span><span class="o">(</span><span class="mi">50000</span><span class="o">))</span>
</pre></div>
</div>
<p>The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.</p>
</div>
<div class="section" id="push-pull-connection">
<h3>Push-Pull Connection</h3>
<p>Akka ZeroMQ module supports <tt class="docutils literal"><span class="pre">Push-Pull</span></tt> connections.</p>
<p>You can create a <tt class="docutils literal"><span class="pre">Push</span></tt> connection through the:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">def</span> <span class="n">newPushSocket</span><span class="o">(</span><span class="n">socketParameters</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">SocketOption</span><span class="o">])</span><span class="k">:</span> <span class="kt">ActorRef</span>
</pre></div>
</div>
<p>You can create a <tt class="docutils literal"><span class="pre">Pull</span></tt> connection through the:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">def</span> <span class="n">newPullSocket</span><span class="o">(</span><span class="n">socketParameters</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">SocketOption</span><span class="o">])</span><span class="k">:</span> <span class="kt">ActorRef</span>
</pre></div>
</div>
<p>More documentation and examples will follow soon.</p>
</div>
<div class="section" id="rep-req-connection">
<h3>Rep-Req Connection</h3>
<p>Akka ZeroMQ module supports <tt class="docutils literal"><span class="pre">Rep-Req</span></tt> connections.</p>
<p>You can create a <tt class="docutils literal"><span class="pre">Rep</span></tt> connection through the:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">def</span> <span class="n">newRepSocket</span><span class="o">(</span><span class="n">socketParameters</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">SocketOption</span><span class="o">])</span><span class="k">:</span> <span class="kt">ActorRef</span>
</pre></div>
</div>
<p>You can create a <tt class="docutils literal"><span class="pre">Req</span></tt> connection through the:</p>
<div class="highlight-scala"><div class="highlight"><pre><span class="k">def</span> <span class="n">newReqSocket</span><span class="o">(</span><span class="n">socketParameters</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">SocketOption</span><span class="o">])</span><span class="k">:</span> <span class="kt">ActorRef</span>
</pre></div>
</div>
<p>More documentation and examples will follow soon.</p>
</div>
</div>
</div>


          </div>
          <div class="span3"><p class="contents-title">Contents</p>
              <div id="scroller-anchor">
                <div id="scroller">
                  <div id="toc"></div>
                </div>
              </div></div>
        </div>
      </div>
    </div>
  </div>
  <div class="footer">
  <div class="container">
    <ul>
      <li><h5>Akka</h5></li>
      <li><a href="http://akka.io/docs">Documentation</a></li>
      <li><a href="http://akka.io/downloads">Downloads</a></li>
    </ul>
    <ul>
      <li><h5>Contribute</h5></li>
      <li><a href="http://github.com/akka/akka">Source Code</a></li>
      <li><a href="http://groups.google.com/group/akka-user">Mailing List</a></li>      
      <li><a href="http://www.assembla.com/spaces/akka/tickets">Report a Bug</a></li>      
    </ul>
    <ul>
      <li><h5>Company</h5></li>
      <li><a href="http://typesafe.com/products/typesafe-subscription">Commercial Support</a></li>
      <li><a href="http://akka.io/team">Team</a></li>
      <li><a href="mailto:info@typesafe.com">Contact</a></li>
    </ul>
    <ul>
      <li><img src="../_static/watermark.png" align="center"/></li>
    </ul>
  </div>
  <div class="container copyright">
    <p style="float: left;">
      © 2012 <a href="http://typesafe.com/">Typesafe Inc.</a> <span class="license">Akka is Open Source and available under the Apache 2 License.</span>
    </p>
    <p style="float: right; font-size: 12px;">
      Last updated: Apr 13, 2012
    </p>          
  </div>
</div>
<script type="text/javascript">
  $('#toc').toc();
</script>
  

  </body>
</html>